package com.mimo.logic.metric.service.impl;

import java.text.MessageFormat;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import com.mimo.common.dto.Entry;
import com.mimo.common.utils.DateUtil;
import com.mimo.logic.metric.constants.MetricKeys;
import com.mimo.logic.metric.constants.MetricType;
import com.mimo.logic.metric.service.IMetricService;

@Service
public class MetricServiceImpl implements IMetricService {

  private static final String KEY_PATTERN = "yyyyMMdd";

  @Autowired
  private RedisTemplate<String, String> redisTemplate;

  @Override
  public long accumulate(MetricType type, String source) {
    return this.accumulate(type, source, 1);
  }

  private String getRedisKeyByType(MetricType type) {
    String key = null;
    switch (type) {
      case User_Create:
        key = MetricKeys.LOGIC_METRIC_USER_CREATE_KEY;
        break;
      case User_Login:
        key = MetricKeys.LOGIC_METRIC_USER_LOGIN_KEY;
        break;
      case Msg_P2P_Upstream:
        key = MetricKeys.LOGIC_METRIC_MSG_P2P_UPSTREAM_KEY;
        break;
      case Msg_P2P_Downstream:
        key = MetricKeys.LOGIC_METRIC_MSG_P2P_DOWNSTREAM_KEY;
        break;
      case Msg_Room_Upstream:
        key = MetricKeys.LOGIC_METRIC_MSG_ROOM_UPSTREAM_KEY;
        break;
      case Msg_Room_Downstream:
        key = MetricKeys.LOGIC_METRIC_MSG_ROOM_DOWNSTREAM_KEY;
        break;
      case Room_Create:
        key = MetricKeys.LOGIC_METRIC_ROOM_CREATE_KEY;
        break;
      case User_Active:
        throw new UnsupportedOperationException("User_Active Key 计算不在此方法上支持");
      default:
        throw new IllegalStateException();
    }
    return key;
  }

  @Override
  public Collection<Entry<String, Long>> getMetric(MetricType type, Date begin, Date end) {
    Collection<Entry<String, Long>> ret = new LinkedList<>();

    List<String> dateKeys = new LinkedList<>();
    Date idx = begin;
    while (DateUtil.isLessEqualThan(idx, end)) {
      dateKeys.add(DateUtil.format(KEY_PATTERN, idx));
      idx = DateUtil.addField(idx, Calendar.DATE, 1);
    }

    if (Objects.equals(type, MetricType.User_Active)) {
      ret = dateKeys.stream()
          .map(
              k -> new Entry<>(k,
                  Optional.ofNullable(
                      redisTemplate.opsForSet().size(MessageFormat.format(MetricKeys.LOGIC_METRIC_USER_ACTIVE_KEY, k)))
                      .orElse(0L)))
          .collect(Collectors.toList());
    } else {
      String key = getRedisKeyByType(type);
      List<String> elements = redisTemplate.<String, String> opsForHash().multiGet(key, dateKeys);
      for (int i = 0; i < elements.size(); i++) {
        ret.add(new Entry<>(dateKeys.get(i), Optional.ofNullable(elements.get(i)).map(Long::valueOf).orElse(0L)));
      }
    }

    return ret;
  }

  @Override
  public long accumulate(MetricType type, String source, long delta) {
    Objects.requireNonNull(type);
    Objects.requireNonNull(source);
    Assert.isTrue(delta > 0, "delta 必须大于0");
    long ret = 0;
    if (Objects.equals(type, MetricType.User_Active)) {
      byte[] key = MessageFormat
          .format(MetricKeys.LOGIC_METRIC_USER_ACTIVE_KEY, DateUtil.format(KEY_PATTERN, new Date())).getBytes();
      List<Object> data = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
        connection.sAdd(key, source.getBytes());
        connection.sCard(key);
        return null;
      });
      ret = Optional.ofNullable(data.get(1)).map(Long.class::cast).orElse(0L);
    } else {
      String key = getRedisKeyByType(type);
      String hashKey = DateUtil.format(KEY_PATTERN, new Date());
      ret = redisTemplate.<String, String> opsForHash().increment(key, hashKey, delta);
    }
    return ret;
  }
}
